Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write multiple documents in a single operation #467

Merged
merged 8 commits into from
Dec 23, 2024
Merged

write multiple documents in a single operation #467

merged 8 commits into from
Dec 23, 2024

Conversation

jchris
Copy link
Contributor

@jchris jchris commented Dec 19, 2024

this allows inserting multiple records into a single CRDT operation. For some workloads it may see a benefit, as larger datasets can have fewer log entries.

Summary by CodeRabbit

  • New Features

    • Introduced a bulk method for processing multiple documents in the ledger.
    • Enhanced WriteQueue with bulk processing capabilities and improved error handling.
    • Added new configuration options for the write queue.
  • Bug Fixes

    • Improved resource management in the shellClose method.
  • Tests

    • Added a test case for the new bulk method in the ledger.
    • Restructured and renamed test cases for clarity in the Ledger functionality tests.
    • Standardized write queue configuration across multiple CRDT test cases.
    • Updated test configurations in the indexer test suite to utilize default write queue options.

@@ -21,7 +23,7 @@ export function writeQueue<T extends DocTypes>(worker: WorkerFunction<T>, payloa
isProcessing = true;

const tasksToProcess = queue.splice(0, payload);
const updates = tasksToProcess.map((item) => item.task);
const updates = tasksToProcess.flatMap((item) => item.task || item.tasks || []);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is ugly, why not just have task be DocUpdate<T> | DocUpdate<T>[]?

@jchris jchris requested a review from mabels December 20, 2024 19:50
Copy link

coderabbitai bot commented Dec 23, 2024

Warning

Rate limit exceeded

@mabels has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 1 minutes and 23 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between a65855b and cbf933f.

📒 Files selected for processing (1)
  • src/write-queue.ts (1 hunks)

Walkthrough

The pull request introduces enhancements to the document management system, focusing on bulk document operations and write queue configuration. The changes span multiple files, including ledger.ts, types.ts, and write-queue.ts. A new bulk method is added to the Ledger interface, allowing multiple documents to be processed simultaneously. The write queue functionality is expanded with configurable chunk sizes and improved task processing, providing more flexible and efficient document handling.

Changes

File Change Summary
src/ledger.ts - Added bulk method to Ledger interface and LedgerImpl class
- Updated constructor to initialize _writeQueue with new parameters
- Modified shellClose to close _writeQueue
- Added writeQueue property to LedgerOpts interface
src/types.ts - Added optional writeQueue to ConfigOpts
- Introduced new BulkResponse interface with ids, clock, and optional name
src/write-queue.ts - Added WriteQueueParams interface with chunkSize
- Expanded WriteQueue with bulk and close methods
- Implemented WriteQueueImpl with improved task processing
tests/fireproof/fireproof.test.ts - Added test case for bulk document insertion
tests/fireproof/ledger.test.ts - Updated test configurations with write queue chunk sizes
- Reorganized test suites for parallel writes
src/index.ts - Added export for defaultWriteQueueOpts
tests/fireproof/crdt.test.ts - Integrated defaultWriteQueueOpts into dbOpts for CRDT tests
tests/fireproof/indexer.test.ts - Integrated defaultWriteQueueOpts into dbOpts for indexer tests

Sequence Diagram

sequenceDiagram
    participant Client
    participant Ledger
    participant WriteQueue
    participant Worker

    Client->>Ledger: bulk(documents)
    Ledger->>WriteQueue: bulk(documents)
    WriteQueue-->>WriteQueue: Process in chunks
    loop Process Tasks
        WriteQueue->>Worker: Process task batch
        Worker-->>WriteQueue: Task results
    end
    WriteQueue-->>Ledger: Bulk operation response
    Ledger-->>Client: Document IDs and metadata
Loading

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (8)
src/write-queue.ts (5)

14-14: Consider accepting both single and multiple tasks in one property.
While this approach is valid, you may explore a type union for the property (e.g., task?: DocUpdate | DocUpdate[] ) to unify the handling.


26-32: Check for valid configuration in defaultWriteQueueOpts.
Consider validating that opts.chunkSize > 0 or throwing an error for invalid input, instead of silently accepting.


34-47: Potential constructor safeguard.
You might validate or sanitize opts before assignment, ensuring chunkSize is within a safe range. Also consider logging if an unexpected value is received.


78-82: Bulk method design is coherent.
The logic is aligned with the new queue structure. Consider adding optional concurrency controls if large bulks can overwhelm downstream processes.


84-89: Push method is consistent with bulk.
Both methods unify well under the same queue-based approach. Might consider factoring out shared logic to avoid duplication if it grows more complex.

tests/fireproof/ledger.test.ts (3)

236-268: Consider adding error scenario test cases.

The test suite verifies the happy path for ordered writes, but it would be beneficial to add test cases for error scenarios, such as:

  • Network failures during writes
  • Concurrent writes with conflicts
  • Invalid document formats
it("should handle write failures gracefully", async () => {
  // Mock a network failure
  const failingDoc = { _id: "fail-test", hello: "world" };
  // ... test implementation
});

329-330: Remove commented console.log statement.

Clean up the test code by removing the commented debug statement.

rows.sort((a, b) => a.key.localeCompare(b.key));
-// console.log(rows);
for (let i = 0; i < 10; i++) {

Line range hint 236-330: Consider implementing a shared test factory for write queue configurations.

The two test suites share similar setup and verification logic but with different chunk sizes. Consider creating a shared test factory that can run the same test cases with different configurations to reduce code duplication and make it easier to test additional configurations in the future.

Example approach:

const testWriteQueue = (description: string, config: { chunkSize: number }) => {
  describe(`basic Ledger parallel writes / ${description}`, () => {
    // Shared test setup and cases
  });
};

testWriteQueue("public ordered", { chunkSize: 1 });
testWriteQueue("public parallel", { chunkSize: 32 });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 613df69 and 6a4d7e6.

📒 Files selected for processing (5)
  • src/ledger.ts (9 hunks)
  • src/types.ts (3 hunks)
  • src/write-queue.ts (1 hunks)
  • tests/fireproof/fireproof.test.ts (1 hunks)
  • tests/fireproof/ledger.test.ts (2 hunks)
🔇 Additional comments (20)
src/write-queue.ts (8)

1-2: Imports look good.
No issues spotted with these import statements.


9-10: New methods in WriteQueue interface look appropriate.
The addition of the “bulk” and “close” methods aligns with the PR objective to handle multiple documents and finalize the queue’s work.


20-24: Parameter validation suggestion.
Ensure that bare minimum constraints (e.g., chunkSize ≥ 1) are enforced or documented, preventing unintended behavior (like slice(0, 0)) when chunkSize is zero or negative.


55-60: Concurrency note.
When multiple async calls arrive, if isProcessing is false, they will all call process concurrently. This is effectively throttled by the isProcessing guard, so it should be fine, but keep an eye out for potential race conditions or missed edge cases.


66-67: Cleanly handling successful resolution.
Good job returning the result from the worker and fulfilling the promise in a structured way.


69-69: Error handling is well structured.
Rejecting tasks individually with error logs is a solid approach, ensuring that one failing task doesn’t block the entire batch.


90-94: Graceful shutdown.
Returning a Promise that resolves when the queue is empty is a clean design. Ensure that potential unhandled rejections or errors don’t prevent resolving.


97-98: Factory function is straightforward.
No suggestions here; returning the implementation directly is concise and clear.

src/ledger.ts (8)

3-3: Imports are correct and consistent.
These newly included items align with the new write queue features.


18-18: BulkResponse type usage.
Ensuring references to BulkResponse remain consistent across files is crucial. No issue spotted.


53-53: LedgerOpts now requires writeQueue.
Be sure that external usages of LedgerOpts construct objects with this property, or fallback to a default.


79-79: New bulk method in Ledger interface.
This seamlessly extends the Ledger capabilities for multi-document operations.


113-113: Using defaultWriteQueueOpts is a good fallback approach.
This helps ensure chunkSize and other settings are kept stable.


169-171: Delegating ledgerShell bulk to ledgerImpl.
Clean pass-through design. The separation of concerns is clear.


236-236: Ensuring queue is closed upon shellClose.
This is essential to gracefully release resources. Good approach.


269-269: Customized worker for multi-doc CRDT updates.
Tying the ledger’s CRDT “bulk” method to the queue’s worker is a nice example of composable architecture.

src/types.ts (3)

9-9: Cross-file import references.
This import from write-queue.js ensures type consistency across modules.


122-122: Optional Partial.
Allowing partial overrides is flexible. Just ensure defaults are safely provided.


306-310: BulkResponse interface additions.
Straightforward structure that matches usage in ledger and queue logic. Looks good.

tests/fireproof/fireproof.test.ts (1)

142-150: Bulk operation test coverage is good.
Including multiple documents and verifying retrieval ensures functionality across the entire workflow. Consider also testing error handling (e.g., invalid input) in future tests.

Comment on lines +287 to +292
it("should resolve to one head", async () => {
const crdt = db.crdt;
expect(crdt.clock.head.length).toBe(9);
await db.put({ _id: "id-10", hello: "world" });
expect(crdt.clock.head.length).toBe(1);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid hardcoding expected head count assumptions.

The test assumes specific head counts (9 and 1) which could make the test brittle. Consider making the assertions more flexible to accommodate implementation changes while still verifying the core behavior - that multiple heads eventually resolve to one.

-expect(crdt.clock.head.length).toBe(9);
+expect(crdt.clock.head.length).toBeGreaterThan(1);
await db.put({ _id: "id-10", hello: "world" });
expect(crdt.clock.head.length).toBe(1);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
it("should resolve to one head", async () => {
const crdt = db.crdt;
expect(crdt.clock.head.length).toBe(9);
await db.put({ _id: "id-10", hello: "world" });
expect(crdt.clock.head.length).toBe(1);
});
it("should resolve to one head", async () => {
const crdt = db.crdt;
expect(crdt.clock.head.length).toBeGreaterThan(1);
await db.put({ _id: "id-10", hello: "world" });
expect(crdt.clock.head.length).toBe(1);
});

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/write-queue.ts (1)

55-76: Consider enhancing error handling and process scheduling.

Two potential improvements for the process method:

  1. Consider adding batch-level error handling to catch and log any unexpected errors during processing.
  2. Replace the recursive void this.process() call with a non-recursive approach to prevent potential stack overflow with large queues.
-    void this.process();
+    setImmediate(() => void this.process());
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6a4d7e6 and e77d70e.

📒 Files selected for processing (5)
  • src/ledger.ts (9 hunks)
  • src/types.ts (3 hunks)
  • src/write-queue.ts (1 hunks)
  • tests/fireproof/fireproof.test.ts (1 hunks)
  • tests/fireproof/ledger.test.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/fireproof/fireproof.test.ts
  • src/types.ts
🔇 Additional comments (9)
src/write-queue.ts (5)

20-32: LGTM! Well-documented interface and sensible defaults.

The WriteQueueParams interface and defaultWriteQueueOpts function are well-designed with clear documentation and reasonable defaults.


34-46: LGTM! Well-structured class implementation.

The class is well-organized with proper initialization and logging setup.


48-53: LGTM! Proper queue state management.

The queue state tracking is well-implemented using Future for async operations.


78-89: LGTM! Consistent implementation of bulk and push operations.

The methods are well-implemented with consistent patterns for task queueing.


90-98: LGTM! Clean implementation of close method and factory.

The close method properly manages queue cleanup and the factory function follows good practices.

src/ledger.ts (2)

52-53: LGTM! Well-integrated write queue configuration.

The writeQueue configuration is properly integrated into the LedgerOpts interface.


305-320: LGTM! Robust implementation of bulk document operations.

The bulk method properly handles document processing with appropriate ID generation and error propagation.

tests/fireproof/ledger.test.ts (2)

236-268: LGTM! Comprehensive tests for ordered write operations.

The test suite properly verifies the behavior of ordered writes, including head count and change order.


287-292: Avoid hardcoding expected head count assumptions.

The test assumes specific head counts (9 and 1) which could make the test brittle. Consider making the assertions more flexible to accommodate implementation changes while still verifying the core behavior.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
tests/fireproof/crdt.test.ts (1)

466-466: Applying default queue in Loader with many transactions.

Ensures concurrency patterns remain uniform in large-scale commit tests. Keep an eye on potential throughput or memory constraints when scaling.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e77d70e and d20ecc2.

📒 Files selected for processing (3)
  • src/index.ts (1 hunks)
  • tests/fireproof/crdt.test.ts (10 hunks)
  • tests/fireproof/indexer.test.ts (2 hunks)
🔇 Additional comments (12)
src/index.ts (1)

8-8: Looks good!

The export statement cleanly re-exports the default queue options for external usage. This is consistent with the other export patterns in the file. No further changes needed.

tests/fireproof/indexer.test.ts (2)

1-13: Use of defaultWriteQueueOpts in import block.

The addition of defaultWriteQueueOpts is consistent with the updated write queue configuration and helps maintain a standardized approach to queuing. No issues found here.


293-293: Ensuring proper usage of defaultWriteQueueOpts.

By assigning writeQueue: defaultWriteQueueOpts({}), you align this test with the new default write queue configuration. Verify that any custom write queue settings are, indeed, intentionally left at defaults.

✅ Verification successful

Let me gather information about the usage of defaultWriteQueueOpts to verify this concern.


Based on the search results, I can now provide a final response:

Default write queue configuration is consistent with test patterns

The usage of defaultWriteQueueOpts({}) in this test file follows the established pattern across the test suite. The search results show that this is the standard configuration used in most test cases, with only specific performance-related tests (test-parallel-writes and test-parallel-writes-ordered) explicitly overriding the defaults. The default configuration sets a chunkSize of 32, which is appropriate for general testing scenarios.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for defaultWriteQueueOpts definition and usage
rg "defaultWriteQueueOpts" -A 5

# Search for writeQueue configurations in test files
rg "writeQueue:" --type ts -A 3 tests/

Length of output: 10068

tests/fireproof/crdt.test.ts (9)

1-1: New import for defaultWriteQueueOpts.

The import of defaultWriteQueueOpts introduces a unified way to configure write queues for your CRDT tests. Looks good.


61-61: Reusing the same default write queue configuration.

This standardization simplifies test maintenance. No immediate concerns.


117-117: Consistent defaultWriteQueueOpts usage for multi-writes.

Retaining consistency helps cover multi-write scenarios uniformly. All good.


188-188: Checking defaultQueueOpts in two multi-writes scenario.

Ensures that multiple batch operations use the same queue settings. No issues found.


304-304: Index-related test also applying defaultWriteQueueOpts.

Maintains a unified testing context. No further action needed.


410-410: Second Loader test with default queue.

Great to see consistent usage. No issues surfaced.


242-242: Extended usage of default write queue in compaction scenario.

Makes compaction tests consistent with the rest of CRDT tests. Review for performance in large data sets.

✅ Verification successful

Let me gather information about the write queue usage and compaction tests to verify this concern.


Based on the search results, I can now generate the final response:

Default write queue configuration is consistently used across CRDT tests including compaction scenarios

The codebase shows consistent usage of defaultWriteQueueOpts({}) across all CRDT test scenarios, including compaction tests. The write queue configuration is uniform and there's no evidence of performance issues in large dataset tests. The compaction tests are properly integrated with the same write queue settings as other CRDT operations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for writeQueue usage in test files
rg "writeQueue" tests/ -B 2 -A 2

# Search for compaction related tests and their write queue configuration
rg "compaction|compact" tests/ -B 2 -A 2

# Look for defaultWriteQueueOpts definition and usage
ast-grep --pattern 'defaultWriteQueueOpts'

Length of output: 21702


18-18: Applying default write queue options.

Using defaultWriteQueueOpts for dbOpts ensures consistent behavior across tests. Confirm that these defaults meet the test scenarios' performance expectations.

✅ Verification successful

Let me search for information about the defaultWriteQueueOpts to understand its implementation and usage across tests.


Let me check the implementation of the write queue options to understand its parameters better.


Based on the gathered information, I can now provide a final response:

Default write queue configuration is appropriate for test scenarios.

The defaultWriteQueueOpts sets a chunkSize of 32, which is suitable for test scenarios as it provides a good balance between performance and memory usage. The test file correctly uses these defaults, and the test suite includes specific tests with different chunk sizes (as seen in ledger.test.ts) to verify behavior under various configurations. No changes needed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for defaultWriteQueueOpts definition and usage
rg "defaultWriteQueueOpts" -A 5

# Also search for writeQueue configurations in test files
rg "writeQueue" tests/ -A 3

Length of output: 10057


Script:

#!/bin/bash
# Look for WriteQueueParams interface and related configurations
cat src/write-queue.ts

Length of output: 3270


356-356: Loader test with default write queue.

This ensures write behavior remains aligned with the rest of the codebase. Confirm any special loader edge cases are still valid.

✅ Verification successful

Let me search for related test cases and write queue implementations to verify this concern.


Let me search for the specific loader test implementation to understand the edge cases being tested.


Based on the gathered information, I can now provide the final response:

Write queue configuration is correctly tested for loader scenarios

The test suite thoroughly verifies the write queue behavior with loader operations through multiple test cases:

  • Single transaction commits are verified with default chunk size
  • Multiple transaction commits are tested to ensure proper queuing
  • Car file loading and metadata integrity are validated
  • The default write queue options (chunk size: 32) are consistently applied across all loader tests
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for write queue related tests and implementations
rg -l "writeQueue" --type ts

# Look for loader test cases
rg "loader.*test" --type ts

# Search for defaultWriteQueueOpts usage
rg "defaultWriteQueueOpts" -A 5 --type ts

Length of output: 7022


Script:

#!/bin/bash
# Get the loader test implementation and surrounding context
rg -B 5 -A 20 "test-loader" tests/fireproof/crdt.test.ts

# Check write queue implementation to understand default behavior
cat src/write-queue.ts

Length of output: 7462

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/write-queue.ts (4)

20-24: Enhance WriteQueueParams documentation.

The interface documentation should clarify:

  • The purpose and impact of chunkSize
  • Valid range of values
  • Performance implications
 export interface WriteQueueParams {
-  // default 32
-  // if chunkSize is 1 the result will be ordered in time
+  /**
+   * Maximum number of tasks to process in a single batch.
+   * - Default: 32
+   * - Set to 1 for strict time-ordered processing
+   * - Higher values improve throughput but may affect ordering
+   * @minimum 1
+   */
   readonly chunkSize: number;
 }

14-15: Consider using union type for tasks as suggested.

The current implementation uses an optional tasks array, but a union type might be clearer as suggested in the previous review.

 interface WriteQueueItem<T extends DocTypes> {
-  // readonly task?: DocUpdate<T>;
-  readonly tasks?: DocUpdate<T>[];
+  readonly task: DocUpdate<T> | DocUpdate<T>[];
   resolve(result: MetaType): void;
   reject(error: Error): void;
 }

55-76: Enhance process method robustness.

The current implementation has several areas for improvement:

  1. No timeout handling for worker operations
  2. Memory usage could grow unbounded without queue size limits
  3. Error handling could be more informative

Consider implementing:

  1. Worker timeout mechanism
  2. Queue size limits
  3. Detailed error reporting

Example implementation:

private readonly MAX_QUEUE_SIZE = 10000;
private readonly WORKER_TIMEOUT = 30000; // 30 seconds

private async process() {
  if (this.isProcessing || this.queue.length === 0) {
    this.testEmptyQueue();
    return;
  }
  
  if (this.queue.length > this.MAX_QUEUE_SIZE) {
    this.logger.Warn().Int("queueSize", this.queue.length).Msg("Queue size exceeds limit");
  }

  this.isProcessing = true;
  const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
  
  const processWithTimeout = async (update: DocUpdate<T>[], index: number) => {
    const timeoutPromise = new Promise((_, reject) => {
      setTimeout(() => reject(new Error("Worker timeout")), this.WORKER_TIMEOUT);
    });
    
    try {
      const result = await Promise.race([this.worker(update), timeoutPromise]);
      tasksToProcess[index].resolve(result);
    } catch (error) {
      const enhancedError = new Error(
        `Task processing failed: ${error.message}`,
        { cause: error }
      );
      tasksToProcess[index].reject(enhancedError);
    }
  };

  // ... rest of the implementation
}

78-94: Consider adding queue management features.

The current implementation could benefit from additional queue management capabilities:

  • Queue status monitoring
  • Ability to pause/resume processing
  • Queue statistics

Example additions:

interface WriteQueue<T extends DocTypes> {
  // ... existing methods ...
  
  /** Get current queue status */
  status(): {
    length: number;
    isProcessing: boolean;
    isPaused: boolean;
  };
  
  /** Pause processing */
  pause(): void;
  
  /** Resume processing */
  resume(): void;
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d20ecc2 and 92baea3.

📒 Files selected for processing (1)
  • src/write-queue.ts (1 hunks)

Comment on lines 42 to 46
constructor(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams) {
this.logger = ensureLogger(sthis, "WriteQueueImpl");
this.worker = worker;
this.opts = opts;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for WriteQueueParams.

The constructor should validate the chunkSize parameter to ensure it's positive and within reasonable bounds.

   constructor(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams) {
     this.logger = ensureLogger(sthis, "WriteQueueImpl");
     this.worker = worker;
+    if (opts.chunkSize < 1) {
+      throw new Error("chunkSize must be at least 1");
+    }
     this.opts = opts;
   }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/write-queue.ts (3)

14-14: Remove commented code.

Remove the commented line as it's no longer needed and can cause confusion.

-  // readonly task?: DocUpdate<T>;

26-31: Consider adding upper bound validation and documentation.

While the implementation handles the lower bound well, consider these improvements:

  1. Add an upper bound for chunkSize to prevent potential memory issues with very large chunks
  2. Document why 32 was chosen as the default value
 export function defaultWriteQueueOpts(opts: Partial<WriteQueueParams> = {}): WriteQueueParams {
+  const DEFAULT_CHUNK_SIZE = 32;
+  const MAX_CHUNK_SIZE = 1000; // or another appropriate limit
+  
+  let chunkSize = opts.chunkSize || DEFAULT_CHUNK_SIZE;
+  if (chunkSize <= 0) {
+    chunkSize = DEFAULT_CHUNK_SIZE;
+  } else if (chunkSize > MAX_CHUNK_SIZE) {
+    chunkSize = MAX_CHUNK_SIZE;
+  }
+  
   return {
     ...opts,
-    chunkSize: opts.chunkSize && opts.chunkSize > 0 ? opts.chunkSize : 32,
+    chunkSize,
   };
 }

86-90: Improve cleanup in close method.

The close method should clear any pending tasks to prevent memory leaks.

 close(): Promise<void> {
   this.waitForEmptyQueue = new Future();
   this.testEmptyQueue();
+  // Clear any pending tasks to prevent memory leaks
+  this.queue.length = 0;
   return this.waitForEmptyQueue.asPromise();
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92baea3 and a65855b.

📒 Files selected for processing (1)
  • src/write-queue.ts (1 hunks)
🔇 Additional comments (3)
src/write-queue.ts (3)

9-10: LGTM! Well-structured interface changes.

The new interfaces and methods are well-designed:

  • bulk method aligns with the PR objective of handling multiple documents
  • close method provides proper cleanup
  • WriteQueueParams interface is well-documented

Also applies to: 20-24


93-94: LGTM! Clean factory function implementation.

The factory function is well-designed and properly typed.


1-94: Verify bulk operation implementation across the codebase.

The implementation aligns well with the PR objective of enabling multiple document writes in a single operation. Let's verify the integration points.

✅ Verification successful

Based on the comprehensive results, I can now provide the final response:

Bulk operation implementation is correctly integrated and thread-safe

The implementation shows proper integration with:

  • Single write operations are converted to bulk operations (push calls bulk([task]))
  • Queue processing is synchronized with isProcessing flag
  • Chunked processing with configurable size
  • Proper error handling and promise resolution
  • Tests verify both single and bulk operations

The implementation is used correctly across the codebase:

  • In ledger.ts for document operations
  • In crdt.ts for handling document updates
  • In indexer-helpers.ts for index updates
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the integration of bulk operations across the codebase

# Check for bulk operation usage patterns
echo "Checking bulk operation usage patterns..."
rg -t typescript "bulk\(" --context 3

# Check for WriteQueueParams configuration
echo "Checking WriteQueueParams configuration..."
rg -t typescript "WriteQueueParams" --context 2

# Check for potential race conditions in concurrent bulk operations
echo "Checking for potential race conditions..."
ast-grep --pattern 'bulk($tasks) {
  $$$
  this.queue.push($$$)
  $$$
}'

Length of output: 513


Script:

#!/bin/bash
# Let's try without typescript file type specification
echo "Checking bulk operation usage patterns..."
rg "bulk\(" -A 3

echo "Checking WriteQueueParams configuration..."
rg "WriteQueueParams"

echo "Checking write queue usage..."
rg "writeQueue\(" -A 3

# Check for any potential concurrent write operations
echo "Checking concurrent write patterns..."
rg "push\(" -A 3

Length of output: 24383

Comment on lines 54 to 75
private async process() {
if (this.isProcessing || this.queue.length === 0) {
this.testEmptyQueue();
return;
}
this.isProcessing = true;
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await worker(updates);
tasksToProcess.forEach((task) => task.resolve(result));
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess.forEach((task) => task.reject(error as Error));
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
this.isProcessing = false;
setTimeout(() => this.process(), 0);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and processing robustness.

The process method has potential issues:

  1. If an error occurs during processing, isProcessing might stay true, causing a deadlock
  2. Using setTimeout(fn, 0) for recursive processing could be improved
 private async process() {
   if (this.isProcessing || this.queue.length === 0) {
     this.testEmptyQueue();
     return;
   }
   this.isProcessing = true;
+  try {
     this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
     const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
     const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
     const promises = updates.map(async (update, index) => {
       try {
         const result = await this.worker(update);
         tasksToProcess[index].resolve(result);
       } catch (error) {
         tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
       }
     });
     await Promise.allSettled(promises);
     this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
+  } catch (error) {
+    this.logger.Error().Err(error).Msg("Fatal error in process loop");
+  } finally {
     this.isProcessing = false;
-    setTimeout(() => this.process(), 0);
+    queueMicrotask(() => this.process());
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async process() {
if (this.isProcessing || this.queue.length === 0) {
this.testEmptyQueue();
return;
}
this.isProcessing = true;
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await worker(updates);
tasksToProcess.forEach((task) => task.resolve(result));
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess.forEach((task) => task.reject(error as Error));
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
this.isProcessing = false;
setTimeout(() => this.process(), 0);
}
private async process() {
if (this.isProcessing || this.queue.length === 0) {
this.testEmptyQueue();
return;
}
this.isProcessing = true;
try {
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
} catch (error) {
this.logger.Error().Err(error).Msg("Fatal error in process loop");
} finally {
this.isProcessing = false;
queueMicrotask(() => this.process());
}
}

@mabels mabels merged commit eae3b6f into main Dec 23, 2024
3 checks passed
@mabels mabels deleted the bulk branch December 23, 2024 13:28
@jchris jchris mentioned this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants